Source code for hysop.operator.hdf_io

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""I/O operators

.. currentmodule:: hysop.operator.hdf_io

* :class:`~HDF_Writer` : operator to write fields into an hdf file
* :class:`~HDF_Reader` : operator to read fields from an hdf file
* :class:`~HDF_IO` abstract interface for hdf io classes

"""
import subprocess
import sys
import os
import functools
from abc import ABCMeta, abstractmethod

try:
    import h5py
except ImportError as e:
    h5py = None
    msg = "Warning: h5py not found, you may not be able to"
    msg += " use hdf5 I/O functionnalities."
    print(msg)
    raise

from hysop import __H5PY_PARALLEL_COMPRESSION_ENABLED__, vprint
from hysop.core.graph.graph import discretized
from hysop.constants import (
    DirectionLabels,
    HYSOP_REAL,
    Backend,
    TranspositionState,
    MemoryOrdering,
)
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.tools.io_utils import IO, IOParams, XMF
from hysop.core.graph.graph import op_apply
from hysop.backend.host.host_operator import HostOperatorBase
from hysop.fields.continuous_field import Field
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.core.memory.memory_request import MemoryRequest
from hysop.topology.topology_descriptor import TopologyDescriptor


[docs] class HDF_IO(HostOperatorBase, metaclass=ABCMeta): """ Abstract interface to read/write from/to hdf files, for hysop fields. """
[docs] @classmethod def supported_backends(cls): """ Return the backends that this operator's topologies can support. """ return Backend.all
def __new__( cls, var_names=None, name_prefix="", name_postfix="", force_backend=None, **kwds ): return super().__new__(cls, **kwds) def __init__( self, var_names=None, name_prefix="", name_postfix="", force_backend=None, **kwds, ): """Read/write some fields data from/into hdf/xmdf files. Parallel io. Parameters ---------- var_names : a dictionnary, optional keys = :class:`~hysop.fields.continuous_field.Field`, values = string, field name. See notes below. name_prefix: str, optional Optional name prefix for variables. name_postfix: str, optional Optional name postfix for variables. force_backend: hysop.constants.Backend Force the source backend for fields. kwds: dict Base class arguments. Notes ----- Dataset in hdf files are identified with names. In hysop, when writing a file, default dataset name is 'continuous_field.name + topo.id + component direction', but this might be changed thanks to var_names argument. For example, if input_fields=[velo, vorti], and if hdf file contains 'vel_1_X, vel_1_Y, vel_1_Z, dat_2_X, dat_2_Y, dat_2_Z' keys, then use : var_names = {velo: 'vel', vorti:'dat'} if you want to read vel/dat into velo/vorti. """ check_instance(var_names, dict, keys=Field, values=str, allow_none=True) super().__init__(**kwds) self.name_prefix = name_prefix self.name_postfix = name_postfix if h5py is None: msg = "You try to use HDF5 reader but h5py module" msg += "has not been found on your system." raise RuntimeError(msg) # If no filename is given, set it to # the concatenation of input_fields'names. if self.io_params is None: # if name is not set, name = concatenation of fields names, # like v1_v2. # WARNING FP: we must sort names (alph. order), else # it seems the order may change from one mpi process # to another. name = "" names = [] for var in self.input_fields: names.append(var.name) names.sort() for nn in names: name += nn + "_" name = name[:-1] self.io_params = IOParams(name, fileformat=IO.HDF5) else: assert self.io_params.fileformat is IO.HDF5 # Dictionnary of names to search in hdf file. May be None. # It will be checked during setup. self.var_names = {} if var_names is None else var_names # Local topology, that MUST be common to all input_fields. self.topology = None self._local_compute_slices = None self._global_grid_resolution = None self._local_grid_resolution = None self._all_local_grid_resolution = None self._global_slices = None # Dictionnary of discrete fields. Key = name in hdf file, # Value = discrete field self.dataset = {} # Get hdf file name. Depends on read/write process. Must be # defined in HDF_READER or HDF_WRITER init. self._get_filename = lambda i=None: None # File Object that holds hdf file self._hdf_file = None # field backend self._force_backend = first_not_None(force_backend, Backend.HOST) td_kwds = {} if force_backend is Backend.OPENCL: assert "cl_env" in kwds td_kwds["cl_env"] = kwds.pop("cl_env") self._td_kwds = td_kwds
[docs] @debug def create_topology_descriptors(self): """ Called in get_field_requirements, just after handle_method Topology requirements (or descriptors) are: 1) min and max ghosts for each input and output variables 2) allowed splitting directions for cartesian topologies """ # Here we recreate TopologyDescriptors to allow a forced backend # like a OpenCL mapped memory backend or when we do not want # to allocate memory for a topology that is just used for I/O. td_kwds = self._td_kwds for field, topo_descriptor in self.input_fields.items(): topo_descriptor = TopologyDescriptor.build_descriptor( backend=self._force_backend, operator=self, field=field, handle=topo_descriptor, **td_kwds, ) self.input_fields[field] = topo_descriptor for field, topo_descriptor in self.output_fields.items(): topo_descriptor = TopologyDescriptor.build_descriptor( backend=self._force_backend, operator=self, field=field, handle=topo_descriptor, **td_kwds, ) self.output_fields[field] = topo_descriptor
[docs] @debug def get_field_requirements(self): # set good transposition state requirements = super().get_field_requirements() for is_input, ireq in requirements.iter_requirements(): if ireq is None: continue (field, td, req) = ireq req.memory_order = MemoryOrdering.C_CONTIGUOUS req.axes = (TranspositionState[field.dim].default_axes(),) return requirements
[docs] def get_node_requirements(self): node_reqs = super().get_node_requirements() node_reqs.enforce_unique_transposition_state = True node_reqs.enforce_unique_topology_shape = True node_reqs.enforce_unique_memory_order = True node_reqs.enforce_unique_ghosts = False return node_reqs
[docs] def discretize(self): super().discretize() topo = self.input_fields[ next(iter(sorted(self.input_fields.keys(), key=lambda f: f.name))) ] use_local_hdf5 = topo.cart_size == 1 use_local_hdf5 |= ( (topo.proc_shape[0] == topo.cart_size) and (topo.cart_size <= 10) and (not self.io_params.hdf5_disable_slicing) ) # XDMF JOIN do not support more than 10 arguments self.topology = topo self.use_local_hdf5 = use_local_hdf5 self.use_parallel_hdf5 = not use_local_hdf5 refmesh = topo.mesh # Global resolution for hdf5 output self._global_grid_resolution = refmesh.grid_resolution # Local resolution for hdf5 output self._local_grid_resolution = refmesh.compute_resolution assert self.io_params.io_leader < topo.cart_comm.size self._all_local_grid_resolution = topo.cart_comm.gather( self._local_grid_resolution, root=self.io_params.io_leader ) local_compute_slices = {} global_compute_slices = {} for field, itopo in self.input_fields.items(): mesh = itopo.mesh assert topo.domain._domain is itopo.domain._domain, "domain mismatch" assert npw.array_equal( refmesh.grid_resolution, mesh.grid_resolution ), "global grid resolution mismatch" assert mesh.on_proc == refmesh.on_proc if mesh.on_proc: local_compute_slices[field] = mesh.local_compute_slices global_compute_slices[field] = mesh.global_compute_slices else: local_compute_slices[field] = tuple( slice(0, 0) for _ in range(self.domain.dim) ) global_compute_slices[field] = tuple( slice(0, 0) for _ in range(self.domain.dim) ) self._local_compute_slices = local_compute_slices self._global_compute_slices = global_compute_slices self.refmesh = refmesh name_prefix, name_postfix = self.name_prefix, self.name_postfix fields_names = {} # Get field names and initialize dataset dict. for df in self.discrete_fields: df_name = df.name if df.continuous_fields()[0] in self.var_names.keys(): df_name = self.var_names[df.continuous_fields()[0]] for d in range(df.nb_components): if df.nb_components == 1: name = name_prefix + df_name + name_postfix else: name = name_prefix + df_name + f"_{d}" + name_postfix self.dataset[name] = df.data[d] fields_names[df.field] = name self.fields_names = fields_names for f, name in self.fields_names.items(): assert f in self._local_compute_slices assert f in self._global_compute_slices self._local_compute_slices[name] = self._local_compute_slices[f] self._global_compute_slices[name] = self._global_compute_slices[f]
[docs] def open_hdf(self, count, mode, compression="gzip"): filename = self._get_filename(count) if self.topology.cart_size == 1: self._hdf_file = h5py.File(filename, mode) elif self.use_parallel_hdf5: self._hdf_file = h5py.File( filename, mode, driver="mpio", comm=self.topology.comm ) # disable compression if hdf5 library version is less than 1.10.2 (checked in CMakeList.txt). if not __H5PY_PARALLEL_COMPRESSION_ENABLED__: compression = None else: filename = filename.format(rk=self.topology.cart_rank) self._hdf_file = h5py.File(filename, mode) if self.io_params.hdf5_disable_compression: compression = None return (filename, compression)
[docs] @classmethod def supports_multiple_topologies(cls): return True
[docs] @classmethod def supports_mpi(cls): return True
[docs] class HDF_Writer(HDF_IO): """ Print field(s) values on a given topo, in HDF5 format. """ __xmf_header = """<?xml version=\"1.0\" ?> <!DOCTYPE Xdmf SYSTEM \"Xdmf.dtd\"> <Xdmf Version=\"2.0\"> <Domain> <Grid Name=\"CellTime\" GridType=\"Collection\" CollectionType=\"Temporal\"> """ __xmf_footer = """ </Grid> </Domain> </Xdmf> """ def __new__(cls, variables, name=None, pretty_name=None, **kwds): return super().__new__( cls, input_fields=None, output_fields=None, name=name, pretty_name=pretty_name, **kwds, ) def __init__(self, variables, name=None, pretty_name=None, **kwds): """ Write some fields data into hdf/xmdf files. Parallel writings. Parameters ---------- kwds : base class arguments """ check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors) vnames = [f"{field.name}" for field in variables.keys()] vpnames = [field.pretty_name for field in variables.keys()] name = first_not_None(name, "write_{}".format("_".join(vnames))) pname = first_not_None(pretty_name, "write_{}".format("_".join(vpnames))) super().__init__( input_fields=variables, output_fields=None, name=name, pretty_name=pname, **kwds, ) # count the number of calls self._count = 0 self.step = self._step_HDF5 self._xdmf_data_files = [] # filename = prefix_N, N = counter value self._get_filename = self._input_fname # In xdmf file, it is not possible to have two times # the same time value. So we save the last value of # time where xdmf has been written, to raise an exception # if that happens. self._last_written_time = None self._xmf_file = None if self.io_params.append: self.openXMFFile() self._data_getters = {}
[docs] def get_work_properties(self, **kwds): requests = super().get_work_properties(**kwds) max_bytes = 0 for name, data in self.dataset.items(): if data.backend.kind == Backend.HOST: continue if data.backend.kind == Backend.OPENCL: from hysop.backend.device.opencl import cl if data.backend.device.type == cl.device_type.CPU: continue # we need a host buffer to get the data max_bytes = max(data.nbytes, max_bytes) host_backend = data.backend.host_array_backend if max_bytes > 0: request = MemoryRequest( backend=host_backend, size=max_bytes, dtype=npw.uint8 ) requests.push_mem_request(request_identifier="buffer", mem_request=request) return requests
[docs] def setup(self, work, **kwds): super().setup(work=work, **kwds) self._setup_grid_template() for name, data in self.dataset.items(): data = data[self._local_compute_slices[name]] if data.backend.kind is Backend.HOST: def get_data(data=data.handle): return data elif data.backend.kind is Backend.OPENCL: from hysop.backend.device.opencl.opencl_copy_kernel_launchers import ( OpenClCopyBufferRectLauncher, ) from hysop.backend.device.opencl import cl if data.backend.device.type == cl.device_type.CPU: def get_data( data=data.handle, queue=data.backend.cl_env.default_queue ): buf = data.map_to_host( queue=queue, is_blocking=True, flags=cl.map_flags.READ ) return buf # unmap is called when buf is destroyed else: (buf,) = work.get_buffer(self, "buffer", handle=True) assert buf.dtype == npw.uint8 assert buf.size >= data.nbytes buf = buf[: data.nbytes].view(dtype=data.dtype).reshape(data.shape) cpy = OpenClCopyBufferRectLauncher.from_slices( varname=name, src=data, dst=buf ) cpy = functools.partial( cpy, queue=data.backend.cl_env.default_queue ) def get_data(cpy=cpy, buf=buf): cpy().wait() return buf else: msg = "Data type not understood or unknown array backend." raise NotImplementedError(msg) self._data_getters[name] = get_data
[docs] def finalize(self): if self._xmf_file: filename = self._xmf_file.name self._xmf_file.close() if self.io_params.dump_is_temporary: vprint(f">Deleting XMF file {filename}...") os.remove(filename)
def _input_fname(self, i): """Set output file name for current iteration""" msg = "count < 0, simu must be initialized." assert i >= 0, msg if (self.topology.cart_size == 1) or self.use_parallel_hdf5: return self.io_params.filename + f"_{i:06d}" + ".h5" else: assert self.use_local_hdf5 return self.io_params.filename + f"_{i:06d}" + "_rk{rk:04d}.h5" @op_apply def apply(self, simulation=None, **kwds): if simulation is None: raise ValueError("Missing simulation value for monitoring.") if self.io_params.should_dump(simulation=simulation): if self._xmf_file is None: self.createXMFFile() self.step(simulation) self._count += 1 def _setup_grid_template(self): topo = self.topology dim = topo.domain.dim dx = list(topo.mesh.space_step) mesh = self.refmesh res = list(mesh.grid_resolution) orig = list(topo.domain.origin) resolution = [ 1, ] * 3 origin = [ 0.0, ] * 3 step = [ 0.0, ] * 3 idim = 3 - dim resolution[idim:] = res origin[idim:] = orig step[idim:] = dx write_resolution = tuple(resolution) write_origin = tuple(origin) write_step = tuple(step) ds_names = self.dataset.keys() joinrkfiles = None if self.use_local_hdf5 and (self.topology.cart_size > 1): joinrkfiles = tuple(range(self.topology.cart_size)) grid_attributes = XMF.prepare_grid_attributes( ds_names, resolution, origin, step, joinrkfiles=joinrkfiles ) self.grid_attributes_template = grid_attributes
[docs] def createXMFFile(self): """Create and fill the header of the xdmf file.""" if self.mpi_params.rank == self.io_params.io_leader: f = open(self.io_params.filename + ".xmf", "wb") f.write(HDF_Writer.__xmf_header.encode()) self._last_xmf_pos = f.tell() self._xmf_file = f f.flush()
[docs] def openXMFFile(self): """Open an existing xdmf file.""" if self.mpi_params.rank == self.io_params.io_leader: f = open(self.io_params.filename + ".xmf", "rb+") f.seek(-len(HDF_Writer.__xmf_footer), 2) # Read from file end self._last_xmf_pos = f.tell() self._xmf_file = f
[docs] def updateXMFFile(self): """Update xdmf file.""" if self.mpi_params.rank == self.io_params.io_leader: f = self._xmf_file lastp = self._last_xmf_pos assert f is not None assert lastp is not None for i, t in self._xdmf_data_files: if (self.topology.cart_size == 1) or self.use_parallel_hdf5: filenames = {"filename": self._get_filename(i).split("/")[-1]} else: filenames = { "filename" + str(r): self._get_filename(i).format(rk=r).split("/")[-1] for r in range(self.topology.cart_size) } filenames.update( ( "resolution" + str(r), XMF._list_format(self._all_local_grid_resolution[r]), ) for r in range(self.topology.cart_size) ) grid_attrs = self.grid_attributes_template.format( niteration=i, time=t, **filenames ) f.seek(lastp) f.write(grid_attrs.encode()) self._last_xmf_pos = f.tell() f.write(HDF_Writer.__xmf_footer.encode()) f.flush() self._xdmf_data_files = []
def _step_HDF5(self, simu): """Write an h5 file with data on each mpi process. If parallel interface of HDF5 is not enabled, each rank is writing its own h5 file. All files are concatenated in the xmf part with a 'JOIN' function. If parallel interface enabled, only one h5 file is written by all ranks. """ # Remarks: # - force np.float64, ParaView seems unable to read float32 # - writing compressed hdf5 files (gzip compression seems the best) # - gzip compression does not work in parallel. # Get 'current' filename. It depends on the number # of the current output (count) and on the current process # rank. self._count = simu.current_iteration (filename, compression) = self.open_hdf(self._count, mode="w") vprint( ">Dumping {} {} HDF5 data to {}...".format( "compressed" if compression else "uncompressed", "parallel" if self.use_parallel_hdf5 else "", filename, ) ) # Get the names of output input_fields and create the corresponding # datasets if self.use_local_hdf5: for name in sorted(self.dataset): ds = self._hdf_file.create_dataset( name=name, shape=self._local_grid_resolution, dtype=self.dataset[name].dtype, compression=compression, track_times=False, ) # required if we want to compare checksums in tests ds[...] = self._data_getters[name]() elif self.use_parallel_hdf5: for name in sorted(self.dataset): ds = self._hdf_file.create_dataset( name=name, shape=self._global_grid_resolution, dtype=self.dataset[name].dtype, compression=compression, track_times=False, ) # required if we want to compare checksums in tests if compression is None: # no need for collective here because we do not use any filter ds[self._global_compute_slices[name]] = self._data_getters[name]() else: with ds.collective: ds[self._global_compute_slices[name]] = self._data_getters[ name ]() else: msg = "Unknown HDF5 mode." raise RuntimeError(msg) # Collect datas required to write the xdmf file # --> add tuples (counter, time). if simu.t() == self._last_written_time: msg = "You cannot write two hdf files for the same " msg += "(time, var) set. " msg += "If you want to save a field two times for " msg += "a single time value, please use two hdf_writer operators." raise RuntimeError(msg) self._xdmf_data_files.append((self._count, simu.t())) self._last_written_time = simu.t() self._hdf_file.close() self.updateXMFFile() if self.io_params.postprocess_dump: postprocess_cmd = self.io_params.postprocess_dump op_name = self.name actual_filepath = self.io_params.filepath disk_filepath = self.io_params.disk_filepath xmf_file = self._xmf_file.name hdf_file = filename hdf_is_tmp = self.io_params.dump_is_temporary iteration = self._count time = self._last_written_time vprint(f">Executing postprocessing script: {postprocess_cmd}") # execute command OP_NAME ACTUAL_FILEPATH DISK_FILEPATH XMF_FILE HDF5_FILE IS_TMP command = [ str(postprocess_cmd), str(op_name), str(actual_filepath), str(disk_filepath), str(xmf_file), str(hdf_file), "1" if hdf_is_tmp else "0", str(iteration), str(time), ] try: subprocess.check_call(command) except OSError as e: msg = f"\nFATAL ERROR: Could not find or execute postprocessing script '{command[0]}'." print(msg) print() raise except subprocess.CalledProcessError as e: if e.returncode == 10: msg = "Postprocessing script has requested to stop the simulation (return code 10), exiting." vprint(msg) sys.exit(0) else: msg = "\nFATAL ERROR: Failed to call I/O postprocessing command.\n{}\n" msg = msg.format(" ".join(command)) print(msg) print() raise if self.io_params.dump_is_temporary: vprint(f">Deleting HDF5 data {filename}...") os.remove(filename) del self._xdmf_data_files[:]
[docs] class HDF_Reader(HDF_IO): """ Parallel reading of hdf/xdmf files to fill some fields in. """ def __new__(cls, variables, restart=None, name=None, **kwds): return super().__new__( cls, input_fields=None, output_fields=None, name=name, **kwds ) def __init__(self, variables, restart=None, name=None, **kwds): """Read some fields data from hdf/xmdf files. Parallel readings. Parameters ---------- restart : int, optional number of a specific iteration to be read, default=None, i.e. read first iteration. kwds : base class arguments Notes: restart corresponds to the number which appears in the hdf file name, corresponding to the number of the iteration where writing occured. See examples in tests_hdf_io.py """ vnames = [f"{var.name[:3]}[{topo.id}]" for var, topo in variables.items()] name = name or "read_{}".format(",".join(vnames)) super().__init__(input_fields=None, output_fields=variables, name=name, **kwds) self.restart = restart if self.restart is not None: # filename = prefix_N, N = counter value self._get_filename = ( lambda i=self.restart: self.io_params.filename + f"_{i:05d}" + ".h5" ) else: self._get_filename = lambda i=None: self.io_params.filename @op_apply def apply(self, simulation=None, **kwds): # Read HDF file self.open_hdf(count=self.restart, mode="r") # Get the list of dataset names available in the hdf file dsnames = self.dataset_names() # And check if required dataset (from self.dataset) # are all in this list. msg = "You try to read a dataset not present in hdf file : " for name in self.dataset: assert name in dsnames, msg + name self.dataset[name][self._local_compute_slices] = self._hdf_file[name][ self._global_compute_slices ] self._hdf_file.close() self._hdf_file = None
[docs] @discretized def dataset_names(self): """Return the list of available names for datasets in the required file. """ if self._hdf_file is None: self.open_hdf(count=self.restart, mode="r") return self._hdf_file.keys()
[docs] def finalize(self): if self._hdf_file is not None: self._hdf_file.close()